/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler.infer

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model._
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.util.FSUtils
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.{CrawlerConfig, LocalFile, OssSource}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.OriginalType
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}


trait ColumnsInfer {
  def infer(path: Path): Try[List[ColumnInfo]]
}

object ColumnsInfer {
  case class ParquetColumnsInfo(crawlerConfig: CrawlerConfig)
    extends ColumnsInfer {
    val configuration = crawlerConfig.source match {
      case source: OssSource =>
        FSUtils.getConfigurationFromOssSource(source)
      case _: LocalFile =>
        new Configuration()
    }

    override def infer(path: Path): Try[List[ColumnInfo]] = {
      try {
        val readFooter =
          ParquetFileReader
            .open(
              HadoopInputFile.fromPath(path, new Configuration())
            )
        val schema = readFooter.getFileMetaData.getSchema
        val types = schema.getFields.asScala
        Success {
          types.map { t =>
            val typeName = t.asPrimitiveType().getPrimitiveTypeName().getMethod
            val columnType = typeName match {
              case "getBoolean" => BooleanType
              case "getInteger" => IntType
              case "getLong" => BigIntType
              case "getFloat" => FloatType
              case "getDouble" => DoubleType
              case "getBinary" =>
                t.asPrimitiveType().getPrimitiveTypeName().name() match {
                  case "FIXED_LEN_BYTE_ARRAY" =>
                    val decimalMeta = t.asPrimitiveType().getDecimalMetadata
                    val scala = decimalMeta.getScale
                    val precision = decimalMeta.getPrecision
                    DecimalType(precision, scala)
                  case "INT96" => TimeStampType
                  case _
                    if t.getOriginalType != null && t.getOriginalType.equals(
                      OriginalType.UTF8
                    ) =>
                    StringType
                  case _ => BinaryType
                }
              case _ => StringType
            }
            ColumnInfo(t.getName, columnType)
          }.toList
        }
      } catch {
        case e: Throwable => Failure[List[ColumnInfo]](e)
      }
    }
  }

  case class DirectoryColumnsInfer(crawlerConfig: CrawlerConfig)
    extends ColumnsInfer {
    val fs: FileSystem = crawlerConfig.source match {
      case source: OssSource =>
        val path = FSUtils.wrapS3Path(source.path)
        FSUtils.getFileSystem(source, path)
    }
    val infer: ColumnsInfer = ParquetColumnsInfo(crawlerConfig)
    private val logger = LoggerFactory.getLogger(getClass)

    override def infer(path: Path): Try[List[ColumnInfo]] = {
      val inferred = fs.listStatus(path).map(f => infer.infer(f.getPath))
      val head = inferred.head
      if (inferred.exists(_ != head)) {
        logger.warn(
          s"path: ${path.toString} infer schema failure different schema"
        )
        Failure(new RuntimeException("Failed to find metadata! Files under the path have different schema"))
      } else {
        head
      }
    }
  }
}
